package com.alibaba.csp.sentinel.dashboard.repository.metric.influx;

import com.alibaba.csp.sentinel.dashboard.datasource.entity.InfluxDbMetricEntity;
import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity;
import com.alibaba.csp.sentinel.dashboard.repository.metric.MetricsRepository;
import com.google.common.collect.Lists;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBException;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Repository;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * @author tom
 */
@Repository
public class InfluxDbMetricsRepository implements MetricsRepository<MetricEntity>, InitializingBean {

    private static final Logger log = LoggerFactory.getLogger(InfluxDbMetricsRepository.class);
    private InfluxDB influxDB;
    private String database;
    private String measurement;
    private InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();

    public InfluxDbMetricsRepository(InfluxDB influxDB, @Value("${spring.influx.db:sentinel_dashboard}") String database
            , @Value("${spring.influx.measurement:resources_metrics}") String measurement) {
        this.influxDB = influxDB;
        this.database = database;
        this.measurement = measurement;
    }

    @Override
    public void save(MetricEntity metric) {
        try {
            Point.Builder builder = Point.measurement(measurement);
            builder.addFieldsFromPOJO(InfluxDbMetricEntity.convert(metric));
            influxDB.write(builder.build());
            log.debug("metric save => {}", metric);
        } catch (Exception e) {
            log.error("metric save error", e);
        }
    }

    @Override
    public void saveAll(Iterable<MetricEntity> metrics) {
        try {
            Iterator<MetricEntity> iter = metrics.iterator();
            while (iter.hasNext()) {
                influxDB.write(Point.measurement(measurement).addFieldsFromPOJO(InfluxDbMetricEntity.convert(iter.next())).build());
            }
            influxDB.flush();
            log.debug("metric saveAll => {}", metrics);
        } catch (Exception e) {
            log.error("metric saveAll error", e);
        }
    }

    @Override
    public List<MetricEntity> queryByAppAndResourceBetween(String app, String resource, long startTime, long endTime) {
        //注意string参数必须用单引号标注!!!! 并且双引号不行!不加也不行!
        QueryResult result = influxDB.query(new Query(String.format("select * from %s where app='%s' and resource='%s' and gmtCreate >= %s and gmtCreate <= %s",
                this.measurement, app, resource, startTime, endTime)));
        log.debug("metric queryByAppAndResourceBetween => {}", result);
        List<InfluxDbMetricEntity> list = this.resultMapper.toPOJO(result, InfluxDbMetricEntity.class, this.measurement);
        return list.stream().map(m -> m.toMetricEntity()).collect(Collectors.toList());
    }

    @Override
    public List<String> listResourcesOfApp(String app) {
        long period = System.currentTimeMillis() - 1000 * 60 * 15;
        QueryResult result = influxDB.query(new Query(String.format("select * from %s where app = '%s' and gmtCreate > %s"
                , this.measurement, app, period)));
        log.debug("metric listResourcesOfApp => {}", result);
        List<InfluxDbMetricEntity> rs = this.resultMapper.toPOJO(result, InfluxDbMetricEntity.class, this.measurement);
        if (CollectionUtils.isEmpty(rs)) {
            return Lists.newArrayList();
        }
        Map<String, InfluxDbMetricEntity> resourceCount = new HashMap<>(32);
        for (InfluxDbMetricEntity metricEntity : rs) {
            String resource = metricEntity.getResource();
            if (resourceCount.containsKey(resource)) {
                InfluxDbMetricEntity oldEntity = resourceCount.get(resource);
                oldEntity.addPassQps(metricEntity.getPassQps());
                oldEntity.addRtAndSuccessQps(metricEntity.getRt(), metricEntity.getSuccessQps());
                oldEntity.addBlockQps(metricEntity.getBlockQps());
                oldEntity.addExceptionQps(metricEntity.getExceptionQps());
                oldEntity.addCount(1);
            } else {
                resourceCount.put(resource, InfluxDbMetricEntity.copyOf(metricEntity));
            }
        }
        return resourceCount.entrySet()
                .stream()
                .sorted((o1, o2) -> {
                    InfluxDbMetricEntity e1 = o1.getValue();
                    InfluxDbMetricEntity e2 = o2.getValue();
                    int t = e2.getBlockQps().compareTo(e1.getBlockQps());
                    if (t != 0) {
                        return t;
                    }
                    return e2.getPassQps().compareTo(e1.getPassQps());
                })
                .map(Map.Entry::getKey)
                .collect(Collectors.toList());
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.hasLength(this.database, "influxDB database invalid!");
        Assert.hasLength(this.measurement, "influxDB measurement invalid!");

        QueryResult rs = this.influxDB.query(new Query("show databases"));
        log.debug("influxDB database check => {}", rs);
        if (rs.hasError()) {
            throw new InfluxDBException(rs.getError());
        }
        if (databaseNotExist(rs)) {
            rs = this.influxDB.query(new Query("create database " + this.database));
            log.info("create influxDB database => {} result => {}", this.database, rs);
        }

        this.influxDB.setDatabase(this.database);
        this.influxDB.enableBatch();
        log.info("influxDB setDatabase => {}", this.database);
    }

    private boolean databaseNotExist(QueryResult rs) {
        return !rs.getResults().get(0).getSeries().get(0).getValues()
                .stream().flatMap(list -> list.stream()).anyMatch(d -> d.toString().equals(this.database));
    }
}
